处理分布式事务(SpringCloud Alibaba Seata) |
您所在的位置:网站首页 › springcloud client › 处理分布式事务(SpringCloud Alibaba Seata) |
前言
一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题 Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 Seata官网 下载 分布式事务过程: 分布式事务处理过程的一ID+三组件模型 Transaction ID XID:全局唯一的事务ID 3组件概念 Transaction Coordinator (TC):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚; Transaction Manager ™:控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议; Resource Manager (RM):控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚 处理过程: TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID; XID 在微服务调用链路的上下文中传播; RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖; TM 向 TC 发起针对 XID 的全局提交或回滚决议; TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
修改conf目录下的seata\conf\file.conf配置文件 主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息
mysql5.7数据库新建库seata 在seata库,执行 \seata-server-0.9.0\seata\conf\db_store.sql 修改seata-server-0.9.0\seata\conf目录下的registry.conf配置文件
启动nacos,启动 seata 实际应用这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。 当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存, 再通过远程调用账户服务来扣减用户账户里面的余额, 最后在订单服务中修改订单状态为已完成。 该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。 业务流程:下订单->减库存->扣余额->改(订单)状态 seata_order:存储订单的数据库; seata_storage:存储库存的数据库; seata_account:存储账户信息的数据库。 seata_order库下建t_order订单表 seata_storage库下建t_storage库存表 seata_account库下建t_account 账户表 订单-库存-账户3个库下都需要建各自的回滚日志表,\seata-server-0.9.0\seata\conf目录下的db_undo_log.sql 这里只列举创建订单微服务,代码太多,已上传 gitee 新建Module:seata-order-service2001 pom com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 0.9.0 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 org.springframework.boot spring-boot-starter-test test org.projectlombok lombok true 复制代码yml server: port: 2001 spring: application: name: seata-order-service cloud: alibaba: seata: #自定义事务组名称需要与seata-server中的对应 tx-service-group: fsp_tx_group nacos: discovery: server-addr: localhost:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order username: root password: 123456 feign: hystrix: enabled: false logging: level: io: seata: info mybatis: mapperLocations: classpath:mapper/*.xml 复制代码resources/file.conf transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { vgroup_mapping.fsp_tx_group = "default" #修改自定义事务组名称 default.grouplist = "127.0.0.1:8091" enableDegrade = false disable = false max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" disableGlobalTransaction = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } ## transaction log store store { ## store mode: file、db mode = "db" ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.jdbc.Driver" url = "jdbc:mysql://127.0.0.1:3306/seata" #seata的数据库 user = "root" password = "123456" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } } 复制代码resources/registry.conf registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" ##注册类型 nacos { serverAddr = "localhost:8848" namespace = "" cluster = "default" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "localhost" namespace = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { app.id = "seata-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } } 复制代码OrderDao @Mapper public interface OrderDao { /** * 创建订单 */ void create(Order order); /** * 修改订单金额 */ void update(@Param("userId") Long userId, @Param("status") Integer status); } 复制代码resources/mapper/OrderMapper.xml INSERT INTO `t_order` (`id`, `user_id`, `product_id`, `count`, `money`, `status`) VALUES (NULL, #{userId}, #{productId}, #{count}, #{money}, 0); UPDATE `t_order` SET status = 1 WHERE user_id = #{userId} AND status = #{status}; 复制代码OrderService public interface OrderService { /** * 创建订单 */ void create(Order order); } 复制代码OrderServiceImpl import com.atguigu.springcloud.alibaba.dao.OrderDao; import com.atguigu.springcloud.alibaba.domain.Order; import com.atguigu.springcloud.alibaba.service.AccountService; import com.atguigu.springcloud.alibaba.service.OrderService; import com.atguigu.springcloud.alibaba.service.StorageService; import io.seata.spring.annotation.GlobalTransactional; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Resource; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; /** * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态 * 简单说: * 下订单->减库存->减余额->改状态 */ @Override @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class) public void create(Order order) { log.info("------->下单开始"); //本应用创建订单 orderDao.create(order); //远程调用库存服务扣减库存 log.info("------->order-service中扣减库存开始"); storageService.decrease(order.getProductId(),order.getCount()); log.info("------->order-service中扣减库存结束"); //远程调用账户服务扣减余额 log.info("------->order-service中扣减余额开始"); accountService.decrease(order.getUserId(),order.getMoney()); log.info("------->order-service中扣减余额结束"); //修改订单状态为已完成 log.info("------->order-service中修改订单状态开始"); orderDao.update(order.getUserId(),0); log.info("------->order-service中修改订单状态结束"); log.info("------->下单结束"); } } 复制代码StorageService @FeignClient(value = "seata-storage-service") public interface StorageService { /** * 扣减库存 */ @PostMapping(value = "/storage/decrease") CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count); } 复制代码AccountService @FeignClient(value = "seata-account-service") public interface AccountService { /** * 扣减账户余额 */ //@RequestMapping(value = "/account/decrease", method = RequestMethod.POST, produces = "application/json; ") @PostMapping("/account/decrease") CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money); } 复制代码controller @RestController public class OrderController { @Autowired private OrderService orderService; /** * 创建订单 */ @GetMapping("/order/create") public CommonResult create( Order order) { orderService.create(order); return new CommonResult(200, "订单创建成功!"); } } 复制代码config.MyBatisConfig.java @Configuration @MapperScan({"com.atguigu.springcloud.alibaba.dao"}) public class MyBatisConfig { } 复制代码config.DataSourceProxyConfig.java import com.alibaba.druid.pool.DruidDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.transaction.SpringManagedTransactionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import javax.sql.DataSource; /** * @auther zzyy * @create 2019-12-11 16:58 * 使用Seata对数据源进行代理 */ @Configuration public class DataSourceProxyConfig { @Value("${mybatis.mapperLocations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); } @Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); } } 复制代码主启动: @EnableDiscoveryClient @EnableFeignClients @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建 public class SeataOrderMainApp2001 { public static void main(String[] args) { SpringApplication.run(SeataOrderMainApp2001.class, args); } } 复制代码库存、账户服务类似。 下订单入口添加了@GlobalTransactional,要么全执行,要么回滚,事务没啥可说的,下边主要看看实现的原理 补充Simple Extensible Autonomous Transaction Architecture,简单可扩展自治事务框架 分布式事务的执行流程: TM 开启分布式事务(TM 向 TC 注册全局事务记录); 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 ); TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务); TC 汇总事务信息,决定分布式事务是提交还是回滚; TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。Seata有四大模式,默认AT模式
完整代码已上传:gitee |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |